CONUS404 Temporal Aggregation#

Create daily averages from hourly data, write to a zarr dataset

import fsspec
import xarray as xr
import hvplot.xarray
import intake
import os
import warnings
from dask.distributed import LocalCluster, Client
warnings.filterwarnings('ignore')

Open dataset from Intake Catalog#

  • Automatically select on-prem dataset from /caldera if running on prem (Denali/Tallgrass)

  • Automatically select cloud data on S3 if not running on prem

To test whether we are on-prem, we see if SLURM_CLUSTER_NAME is defined. If SLURM_CLUSTER_NAME is not defined, the user is either not on Denali/Tallgrass on the main node, which they should not be on

url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
# open the hytest data intake catalog
hytest_cat = intake.open_catalog(url)
list(hytest_cat)
['conus404-drb-eval-tutorial-catalog',
 'nhm-v1.0-daymet-catalog',
 'nhm-v1.1-c404-bc-catalog',
 'nhm-v1.1-gridmet-catalog',
 'conus404-catalog',
 'nwis-streamflow-usgs-gages-onprem',
 'nwis-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-usgs-gages-onprem',
 'nwm21-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-cloud',
 'nwm21-scores',
 'lcmap-cloud',
 'rechunking-tutorial-cloud']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem',
 'conus404-hourly-cloud',
 'conus404-hourly-osn',
 'conus404-daily-diagnostic-onprem',
 'conus404-daily-diagnostic-cloud',
 'conus404-daily-onprem',
 'conus404-daily-cloud',
 'conus404-monthly-onprem',
 'conus404-monthly-cloud']

Start as Dask client using an appropriate Dask Cluster#

This is an optional step, but can speed up data loading significantly, especially when accessing data from the Cloud

def configure_cluster(machine):
    ''' Helper function to configure cluster
    '''
    if machine == 'denali':
        from dask.distributed import LocalCluster, Client
        cluster = LocalCluster(threads_per_worker=1)
        client = Client(cluster)
    
    elif machine == 'tallgrass':
        from dask.distributed import Client
        from dask_jobqueue import SLURMCluster
        cluster = SLURMCluster(queue='cpu', cores=1, 
                               walltime="01:00:00", account="woodshole",
                               interface='ib0', memory='6GB')
        cluster.adapt(maximum=10)
        client = Client(cluster)
        
    elif machine == 'local':
        import os
        import warnings
        warnings.warn("Running locally can result in costly data transfers!\n")
        n_cores = os.cpu_count() # set to match your machine
        cluster = LocalCluster(threads_per_worker=n_cores)
        client = Client(cluster)
        
    elif machine in ['esip-qhub-gateway-v0.4']:   
        import sys, os
        sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
        import ebdpy as ebd
        aws_profile = 'esip-qhub'
        aws_region = 'us-west-2'
        endpoint = f's3.{aws_region}.amazonaws.com'
        ebd.set_credentials(profile=aws_profile, region=aws_region, endpoint=endpoint)
        worker_max = 10
        client,cluster = ebd.start_dask_cluster(profile=aws_profile, worker_max=worker_max, 
                                              region=aws_region, use_existing_cluster=True,
                                              adaptive_scaling=False, wait_for_cluster=False, 
                                              worker_profile='Medium Worker', propagate_env=True)
        
    return client, cluster
if 'SLURM_CLUSTER_NAME' in os.environ:
    dataset = 'conus404-hourly-onprem'
    machine = os.environ['SLURM_CLUSTER_NAME']
    client, cluster = configure_cluster(machine)
else:
    dataset = 'conus404-hourly-cloud'
    machine = 'esip-qhub-gateway-v0.4'
    client, cluster = configure_cluster(machine)
Region: us-west-2
No Cluster running.
Starting new cluster.
{}
Setting Cluster Environment Variable AWS_DEFAULT_REGION us-west-2
Setting Fixed Scaling workers=10
Reconnect client to clear cache
client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub):
https://nebari.esipfed.org/gateway/clusters/dev.123ba0c4fce94a948a67036dc5fffb2b/status
Propagating environment variables to workers
Using environment: users/users-pangeofu
ds = cat[dataset].to_dask()
ds
<xarray.Dataset>
Dimensions:         (time: 368064, y: 1015, x: 1367, bottom_top_stag: 51,
                     bottom_top: 50, soil_layers_stag: 4, x_stag: 1368,
                     y_stag: 1016, snow_layers_stag: 3, snso_layers_stag: 7)
Coordinates:
    lat             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time            (time) datetime64[ns] 1979-10-01 ... 2021-09-25T23:00:00
  * x               (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y               (y) float64 -2.028e+06 -2.024e+06 ... 2.024e+06 2.028e+06
Dimensions without coordinates: bottom_top_stag, bottom_top, soil_layers_stag,
                                x_stag, y_stag, snow_layers_stag,
                                snso_layers_stag
Data variables: (12/157)
    ACDEWC          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPR         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPS         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACECAN          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACEDIR          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACETLSM         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ...              ...
    ZNU             (bottom_top) float32 dask.array<chunksize=(50,), meta=np.ndarray>
    ZNW             (bottom_top_stag) float32 dask.array<chunksize=(51,), meta=np.ndarray>
    ZS              (soil_layers_stag) float32 dask.array<chunksize=(4,), meta=np.ndarray>
    ZSNSO           (time, snso_layers_stag, y, x) float32 dask.array<chunksize=(144, 7, 175, 175), meta=np.ndarray>
    ZWT             (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    crs             int32 ...
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...
ds.SNOW
<xarray.DataArray 'SNOW' (time: 368064, y: 1015, x: 1367)>
dask.array<open_dataset-60f4cd5b1dab559310716eb4d524a8baSNOW, shape=(368064, 1015, 1367), dtype=float32, chunksize=(144, 175, 175), chunktype=numpy.ndarray>
Coordinates:
    lat      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time     (time) datetime64[ns] 1979-10-01 ... 2021-09-25T23:00:00
  * x        (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y        (y) float64 -2.028e+06 -2.024e+06 -2.02e+06 ... 2.024e+06 2.028e+06
Attributes:
    description:   SNOW WATER EQUIVALENT
    grid_mapping:  crs
    long_name:     Snow water equivalent
    units:         kg m-2

Daily averages#

Time averages of any type are easy to do with xarray. Here we do 24 hour averages, and set the time offset to 12 hours, so that the time values are in the middle of the averaging period.

Digital Earth Africa has a great Working with Time in Xarray tutorial.

In the example below we just do a few days with a few variables as a quick demo.

%%time
ds_subset = ds[['T2','U10']].sel(time=slice('2017-01-02','2017-01-13'))
CPU times: user 14.9 ms, sys: 0 ns, total: 14.9 ms
Wall time: 14.4 ms
ds_subset_daily = ds_subset.resample(time="24H", loffset="12H").mean()
ds_subset_daily
<xarray.Dataset>
Dimensions:  (time: 12, y: 1015, x: 1367)
Coordinates:
    lon      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * x        (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y        (y) float64 -2.028e+06 -2.024e+06 -2.02e+06 ... 2.024e+06 2.028e+06
    lat      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time     (time) datetime64[ns] 2017-01-02T12:00:00 ... 2017-01-13T12:00:00
Data variables:
    T2       (time, y, x) float32 dask.array<chunksize=(6, 175, 175), meta=np.ndarray>
    U10      (time, y, x) float32 dask.array<chunksize=(6, 175, 175), meta=np.ndarray>
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...
ds_subset_daily.hvplot.quadmesh(x='lon', y='lat', rasterize=True, 
                             geo=True, tiles='OSM', alpha=0.7, cmap='turbo')

Write daily values as a Zarr dataset (to onprem or cloud)#

%%time
if 'SLURM_CLUSTER_NAME' in os.environ:     # on prem (Caldera filesystem)
    ds_subset_daily.to_zarr('/caldera/usgs/change-me/conus_subset_daily.zarr', mode='w', consolidated=True)
else:                                      # cloud (AWS S3 nhgf-development bucket)
    fs_s3 = fsspec.filesystem('s3', anon=False)
    ds_subset_daily.to_zarr(fs_s3.get_mapper('s3://esip-qhub/testing/conus_subset_daily.zarr'), mode='w', consolidated=True)
CPU times: user 1.31 s, sys: 54.6 ms, total: 1.36 s
Wall time: 13.2 s

Shutdown cluster#

cluster.shutdown()